package com.seean.spark.service

import java.sql.Timestamp
import java.util.Date

import com.seean.spark.model._
import com.seean.spark.model.base.BaseModel
import com.seean.spark.model.schemas.Schemas
import com.seean.spark.utils.{PropertiesUtil, TableViewUtil, Utils}
import com.seean.spark.vo.USER
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.mutable.ListBuffer

/**
  * 数据库某表数据值域和规则计算
  *
  * @authur wsx
  * @date 2020/6/24 15:26
  */
object CheckUserData  extends Serializable {

  //properties
  val properties = PropertiesUtil.loadProperties()
  //create spark
  val appName =properties.getProperty("app.name")
  val spark=SparkSession.builder().appName(appName).config("spark.sql.broadcastTimeout", "20000").getOrCreate()
  //initdb  sql
//  CREATE TABLE T_USER (
//    ID NUMBER(10) NULL ,
//    NAME VARCHAR2(255 BYTE) NULL ,
//    POINT NUMBER(11,2) NULL ,
//    AGE NUMBER(10) NULL ,
//    BIRTH TIMESTAMP(6)  NULL
//  );
//
//  INSERT INTO T_USER VALUES ('1', 'tom', '1.10', '18', TO_TIMESTAMP(' 2020-06-29 10:28:54:000000', 'YYYY-MM-DD HH24:MI:SS:FF6'));

  val driver = properties.getProperty("db.driver")
  val connection = properties.getProperty("db.connection")
  val dbName = properties.getProperty("db.name")
  val dbPwd = properties.getProperty("db.pwd")
  var tableList: List[String] = properties.getProperty("db.table.names").split(",").toList
  TableViewUtil.creatTmpAndView(connection, dbName, dbPwd, tableList, spark)

  val sc = spark.sparkContext
  //隐式转换
  import spark.implicits._

  //引用函数 集合函数 时间函数 数据函数 需要import
  //import org.apache.spark.sql.functions._

  //将需要的表数据查出并转Dataset spark.sql("sql").as[Vo]
  final val noSelectUser = spark.sql("select * from T_USER where ID is not null").as[USER]

  //查看打印schema
  noSelectUser.printSchema()
//  val userSchema = StructType(
//    StructField("id", DecimalType(10,0)) ::
//      StructField("name", StringType) ::
//      StructField("point", DecimalType(11,2)) ::
//      StructField("age", DecimalType(10,0)) ::
//      StructField("birth", TimestampType) ::
//      Nil)

  //计算时若需要map键值对数据，则组装 //这里仅仅举例用的user,实际使用 可能是某些规则信息，字典值域信息等
  val userMap = noSelectUser.rdd.map(f => (f.ID, f.NAME)).collectAsMap()

  //广播
  //把数据往每个excutor上拷贝一份，广播数据
  val userMapBc = sc.broadcast(userMap)

  def main(args: Array[String]): Unit = {

    Logger.getLogger("log").setLevel(Level.ERROR)
    val start = new Date()
    handleData
    val end = new Date()
    println("--------------------------------------总用时: " + (end.getTime - start.getTime) + "ms")
  }

  def handleData: Unit = {
    //所需的校验数据
    val userMap = userMapBc.value
    //待处理的数据
    val handleData = noSelectUser

    val count = noSelectUser.count()

    //分片数据量控制
    var initsize = 5000
    var repartitionSize: Integer = 0
    var m = count.intValue() / initsize
    var n = count.intValue() % initsize
    if (m == 0) repartitionSize = 1 else if (n != 0) repartitionSize = m + 1

    //开始处理数据
    var rel = handleData.rdd.repartition(repartitionSize).mapPartitions(f => {
      val list = new ListBuffer[saveResult]()
      while (f.hasNext) {
        //逻辑计算处理 todo
        val user = f.next()
        val lengthFlag = user.NAME.length > 30
        var saveResult = new saveResult
        var adduser= new User
        adduser.id=2
        adduser.name="lucy"
        adduser.point=1.2
        adduser.age=18
        adduser.birth= new Timestamp(new Date().getTime)
        saveResult.userList= List(adduser)
        list.append(saveResult)
      }
      list.iterator
    })

    try {
      //toRDD
      val t1 = rel.map(f => f.userList).flatMap(v => v).map(item => Row(item.toList(): _*))
      //save result
      save(t1, Schemas.userSchema, "T_USER")
    } catch {
      case e: Exception => println("-------------save error !" + e)
    }
  }

  def save[T <: BaseModel](pars: RDD[Row], schema: StructType, tableName: String): Unit = {
    val df = spark.createDataFrame(pars, schema)
    Utils.dfBacthWrite(df, tableName)
  }

}
